Skip to content

Commit 02b0f83

Browse files
frifriSF59edgao
authored andcommitted
[BigQuery] BulkLoader config (#58087)
1 parent 05d19cc commit 02b0f83

File tree

9 files changed

+176
-39
lines changed

9 files changed

+176
-39
lines changed

airbyte-cdk/bulk/toolkits/load-gcs/src/main/kotlin/io/airbyte/cdk/load/command/gcs/GcsAuthSpecification.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
1717
include = JsonTypeInfo.As.EXISTING_PROPERTY,
1818
property = "credential_type"
1919
)
20-
@JsonSubTypes(JsonSubTypes.Type(value = GcsHmacKeySpecification::class, name = "HMAC key"))
20+
@JsonSubTypes(JsonSubTypes.Type(value = GcsHmacKeySpecification::class, name = "HMAC_KEY"))
2121
sealed class GcsAuthSpecification(
2222
@JsonSchemaTitle("Credential Type")
2323
@get:JsonProperty("credential_type")
24-
val credentialType: Type
24+
val credentialType: Type = Type.HMAC_KEY
2525
) {
2626
enum class Type(@get:JsonValue val authTypeName: String) {
2727
HMAC_KEY("HMAC_KEY"),

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ class BigQueryDestination : BaseConnector(), Destination {
563563
val additionalMicronautEnvs = listOf(AwsToolkitConstants.MICRONAUT_ENVIRONMENT)
564564

565565
fun main(args: Array<String>) {
566+
val additionalMicronautEnvs = listOf(AwsToolkitConstants.MICRONAUT_ENVIRONMENT)
566567
addThrowableForDeinterpolation(BigQueryException::class.java)
567-
AirbyteDestinationRunner.run(*args)
568+
AirbyteDestinationRunner.run(*args, additionalMicronautEnvs = additionalMicronautEnvs)
568569
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt

+15
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class BigQueryRecordFormatter {
8282
}
8383

8484
companion object {
85+
// This is the schema used to represent the final raw table
8586
val SCHEMA_V2: Schema =
8687
Schema.of(
8788
Field.of(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, StandardSQLTypeName.STRING),
@@ -94,5 +95,19 @@ class BigQueryRecordFormatter {
9495
Field.of(JavaBaseConstants.COLUMN_NAME_AB_META, StandardSQLTypeName.STRING),
9596
Field.of(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, StandardSQLTypeName.INT64)
9697
)
98+
99+
// This schema defines the CSV format used for the load job. It differs from SCHEMA_V2 by
100+
// omitting the COLUMN_NAME_AB_LOADED_AT field and by rearranging the column order.
101+
val CSV_SCHEMA: Schema =
102+
Schema.of(
103+
Field.of(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, StandardSQLTypeName.STRING),
104+
Field.of(
105+
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
106+
StandardSQLTypeName.TIMESTAMP
107+
),
108+
Field.of(JavaBaseConstants.COLUMN_NAME_AB_META, StandardSQLTypeName.STRING),
109+
Field.of(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, StandardSQLTypeName.INT64),
110+
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING),
111+
)
97112
}
98113
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/spec/BigquerySpecification.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription
1313
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
1414
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
1515
import io.airbyte.cdk.command.ConfigurationSpecification
16+
import io.airbyte.cdk.load.command.gcs.GcsAuthSpecification
1617
import io.airbyte.cdk.load.command.gcs.GcsCommonSpecification
18+
import io.airbyte.cdk.load.command.gcs.GcsHmacKeySpecification
1719
import io.airbyte.cdk.load.command.gcs.GcsRegion
1820
import io.airbyte.cdk.load.spec.DestinationSpecificationExtension
1921
import io.airbyte.protocol.models.v0.DestinationSyncMode
@@ -116,7 +118,7 @@ class BatchedStandardInsertSpecification :
116118
@JsonSchemaDescription(
117119
"Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO to load your data into BigQuery."
118120
)
119-
abstract class GcsStagingSpecification :
121+
class GcsStagingSpecification :
120122
GcsCommonSpecification, LoadingMethodSpecification(LoadingMethod.GCS) {
121123
@get:JsonSchemaTitle("GCS Tmp Files Post-Processing")
122124
@get:JsonPropertyDescription(
@@ -126,6 +128,10 @@ abstract class GcsStagingSpecification :
126128
@get:JsonProperty("keep_files_in_gcs-bucket", defaultValue = "Delete all tmp files from GCS")
127129
@get:JsonSchemaInject(json = """{"order": 3}""")
128130
val filePostProcessing: GcsFilePostProcessing? = null
131+
override val gcsBucketName: String = ""
132+
override val path: String = ""
133+
override val credential: GcsAuthSpecification =
134+
GcsHmacKeySpecification(accessKeyId = "", secretAccessKey = "")
129135
}
130136

131137
// bigquery supports a subset of GCS regions.
+41-15
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,51 @@
22
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.bigquery
5+
package io.airbyte.integrations.destination.bigquery.write.bulk_loader
66

7+
import com.google.cloud.bigquery.*
78
import com.google.cloud.bigquery.BigQuery
8-
import com.google.cloud.bigquery.FormatOptions
99
import com.google.cloud.bigquery.JobInfo
1010
import com.google.cloud.bigquery.LoadJobConfiguration
1111
import io.airbyte.cdk.load.command.DestinationCatalog
1212
import io.airbyte.cdk.load.command.DestinationStream
13-
import io.airbyte.cdk.load.file.s3.S3KotlinClient
14-
import io.airbyte.cdk.load.file.s3.S3Object
13+
import io.airbyte.cdk.load.file.gcs.GcsBlob
14+
import io.airbyte.cdk.load.file.gcs.GcsClient
1515
import io.airbyte.cdk.load.message.StreamKey
1616
import io.airbyte.cdk.load.write.db.BulkLoader
1717
import io.airbyte.cdk.load.write.db.BulkLoaderFactory
18+
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
1819
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
1920
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
2021
import io.airbyte.integrations.destination.bigquery.spec.GcsFilePostProcessing
21-
import io.airbyte.integrations.destination.bigquery.spec.GcsStagingSpecification
22+
import io.airbyte.integrations.destination.bigquery.spec.GcsStagingConfiguration
2223
import io.airbyte.integrations.destination.bigquery.write.TempUtils
24+
import io.micronaut.context.annotation.Requires
25+
import io.micronaut.context.condition.Condition
26+
import io.micronaut.context.condition.ConditionContext
2327
import jakarta.inject.Singleton
2428

2529
class BigQueryBulkLoader(
26-
private val storageClient: S3KotlinClient,
30+
private val storageClient: GcsClient,
2731
private val bigQueryClient: BigQuery,
2832
private val bigQueryConfiguration: BigqueryConfiguration,
2933
private val stream: DestinationStream,
30-
) : BulkLoader<S3Object> {
31-
override suspend fun load(remoteObject: S3Object) {
34+
) : BulkLoader<GcsBlob> {
35+
override suspend fun load(remoteObject: GcsBlob) {
3236
val rawTableId = TempUtils.rawTableId(bigQueryConfiguration, stream.descriptor)
33-
val gcsUri = "gs://${remoteObject.keyWithBucketName}"
37+
val gcsUri = "gs://${remoteObject.storageConfig.gcsBucketName}/${remoteObject.key}"
38+
39+
val csvOptions =
40+
CsvOptions.newBuilder()
41+
.setSkipLeadingRows(1)
42+
.setAllowQuotedNewLines(true) // safe for long JSON strings
43+
.setAllowJaggedRows(true)
44+
.build()
3445

3546
val configuration =
3647
LoadJobConfiguration.builder(rawTableId, gcsUri)
37-
.setFormatOptions(FormatOptions.csv())
38-
.setSchema(BigQueryRecordFormatter.SCHEMA_V2)
48+
.setFormatOptions(csvOptions)
49+
.setSchema(BigQueryRecordFormatter.CSV_SCHEMA)
3950
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
4051
.setJobTimeoutMs(600000L) // 10 min timeout
4152
.build()
@@ -52,7 +63,7 @@ class BigQueryBulkLoader(
5263
}
5364

5465
val loadingMethodPostProcessing =
55-
(bigQueryConfiguration.loadingMethod as GcsStagingSpecification).filePostProcessing
66+
(bigQueryConfiguration.loadingMethod as GcsStagingConfiguration).filePostProcessing
5667
if (loadingMethodPostProcessing == GcsFilePostProcessing.DELETE) {
5768
storageClient.delete(remoteObject)
5869
}
@@ -63,15 +74,30 @@ class BigQueryBulkLoader(
6374
}
6475
}
6576

77+
class BigqueryConfiguredForBulkLoad : Condition {
78+
override fun matches(context: ConditionContext<*>): Boolean {
79+
val config = context.beanContext.getBean(BigqueryConfiguration::class.java)
80+
return config.loadingMethod is GcsStagingConfiguration
81+
}
82+
}
83+
6684
@Singleton
85+
@Requires(condition = BigqueryConfiguredForBulkLoad::class)
6786
class BigQueryBulkLoaderFactory(
6887
private val catalog: DestinationCatalog,
69-
private val storageClient: S3KotlinClient,
88+
private val storageClient: GcsClient,
7089
private val bigQueryClient: BigQuery,
7190
private val bigQueryConfiguration: BigqueryConfiguration
72-
) : BulkLoaderFactory<StreamKey, S3Object> {
91+
) : BulkLoaderFactory<StreamKey, GcsBlob> {
92+
override val numPartWorkers: Int = 2
93+
override val numUploadWorkers: Int = 10
7394
override val maxNumConcurrentLoads: Int = 1
74-
override fun create(key: StreamKey, partition: Int): BulkLoader<S3Object> {
95+
96+
override val objectSizeBytes: Long = 200 * 1024 * 1024 // 200 MB
97+
override val partSizeBytes: Long = 10 * 1024 * 1024 // 10 MB
98+
override val maxMemoryRatioReservedForParts: Double = 0.6
99+
100+
override fun create(key: StreamKey, partition: Int): BulkLoader<GcsBlob> {
75101
val stream = catalog.getStream(key.stream)
76102
return BigQueryBulkLoader(storageClient, bigQueryClient, bigQueryConfiguration, stream)
77103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.write.bulk_loader
6+
7+
import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfiguration
8+
import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfigurationProvider
9+
import io.airbyte.cdk.load.command.aws.AWSArnRoleConfiguration
10+
import io.airbyte.cdk.load.command.aws.AWSArnRoleConfigurationProvider
11+
import io.airbyte.cdk.load.command.gcs.GOOGLE_STORAGE_ENDPOINT
12+
import io.airbyte.cdk.load.command.gcs.GcsClientConfiguration
13+
import io.airbyte.cdk.load.command.gcs.GcsClientConfigurationProvider
14+
import io.airbyte.cdk.load.command.gcs.GcsHmacKeyConfiguration
15+
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
16+
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
17+
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider
18+
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration
19+
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
20+
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration
21+
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider
22+
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration
23+
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider
24+
import io.airbyte.cdk.load.command.s3.S3BucketConfiguration
25+
import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider
26+
import io.airbyte.cdk.load.file.NoopProcessor
27+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
28+
import io.airbyte.integrations.destination.bigquery.spec.GcsStagingConfiguration
29+
import io.micronaut.context.annotation.Factory
30+
import io.micronaut.context.annotation.Requires
31+
import jakarta.inject.Singleton
32+
import java.io.ByteArrayOutputStream
33+
34+
data class BigqueryBulkLoadConfiguration(
35+
val bigQueryConfiguration: BigqueryConfiguration,
36+
) :
37+
ObjectStoragePathConfigurationProvider,
38+
ObjectStorageFormatConfigurationProvider,
39+
ObjectStorageUploadConfigurationProvider,
40+
S3BucketConfigurationProvider,
41+
AWSAccessKeyConfigurationProvider,
42+
AWSArnRoleConfigurationProvider,
43+
GcsClientConfigurationProvider,
44+
ObjectStorageCompressionConfigurationProvider<ByteArrayOutputStream> {
45+
override val objectStoragePathConfiguration: ObjectStoragePathConfiguration
46+
override val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration =
47+
CSVFormatConfiguration()
48+
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration =
49+
ObjectStorageUploadConfiguration()
50+
override val s3BucketConfiguration: S3BucketConfiguration
51+
override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration
52+
override val awsArnRoleConfiguration: AWSArnRoleConfiguration = AWSArnRoleConfiguration(null)
53+
override val gcsClientConfiguration: GcsClientConfiguration =
54+
(bigQueryConfiguration.loadingMethod as GcsStagingConfiguration).gcsClientConfig
55+
override val objectStorageCompressionConfiguration =
56+
ObjectStorageCompressionConfiguration(NoopProcessor)
57+
58+
init {
59+
bigQueryConfiguration.loadingMethod as GcsStagingConfiguration
60+
s3BucketConfiguration =
61+
S3BucketConfiguration(
62+
s3BucketName = bigQueryConfiguration.loadingMethod.gcsClientConfig.gcsBucketName,
63+
s3BucketRegion = bigQueryConfiguration.loadingMethod.gcsClientConfig.region,
64+
s3Endpoint = GOOGLE_STORAGE_ENDPOINT,
65+
)
66+
val credentials =
67+
bigQueryConfiguration.loadingMethod.gcsClientConfig.credential
68+
as GcsHmacKeyConfiguration
69+
awsAccessKeyConfiguration =
70+
AWSAccessKeyConfiguration(
71+
accessKeyId = credentials.accessKeyId,
72+
secretAccessKey = credentials.secretAccessKey
73+
)
74+
75+
objectStoragePathConfiguration =
76+
ObjectStoragePathConfiguration(
77+
prefix = bigQueryConfiguration.loadingMethod.gcsClientConfig.path,
78+
pathPattern = "\${NAMESPACE}/\${STREAM_NAME}/",
79+
fileNamePattern = "{date}_{timestamp}_{part_number}{format_extension}",
80+
)
81+
}
82+
}
83+
84+
@Factory
85+
@Requires(condition = BigqueryConfiguredForBulkLoad::class)
86+
class BigqueryBLConfigurationProvider(private val config: BigqueryConfiguration) {
87+
@Singleton fun get() = BigqueryBulkLoadConfiguration(config)
88+
}

airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class StandardInsertRawOverrideDisableTd :
6262
}
6363
}
6464

65+
// @Disabled("Disabling until we have the full flow")
6566
class GcsRawOverrideDisableTd :
6667
BigqueryWriteTest(
6768
BigQueryDestinationTestUtils.createConfig(

airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/expected-spec-cloud.json

+10-10
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@
6161
"properties" : {
6262
"credential_type" : {
6363
"type" : "string",
64-
"enum" : [ "HMAC key" ],
65-
"default" : "HMAC key"
64+
"enum" : [ "HMAC_KEY" ],
65+
"default" : "HMAC_KEY"
6666
},
6767
"hmac_key_access_id" : {
6868
"type" : "string",
@@ -96,22 +96,22 @@
9696
"title" : "GCS Tmp Files Post-Processing",
9797
"order" : 3
9898
},
99-
"gcs_bucket_path" : {
100-
"type" : "string",
101-
"description" : "Directory under the GCS bucket where data will be written.",
102-
"title" : "GCS Bucket Path",
103-
"examples" : [ "data_sync/test" ],
104-
"order" : 2
105-
},
10699
"gcs_bucket_name" : {
107100
"type" : "string",
108101
"description" : "The name of the GCS bucket. Read more <a href=\"https://cloud.google.com/storage/docs/naming-buckets\">here</a>.",
109102
"title" : "GCS Bucket Name",
110103
"examples" : [ "airbyte_sync" ],
111104
"order" : 1
105+
},
106+
"gcs_bucket_path" : {
107+
"type" : "string",
108+
"description" : "Directory under the GCS bucket where data will be written.",
109+
"title" : "GCS Bucket Path",
110+
"examples" : [ "data_sync/test" ],
111+
"order" : 2
112112
}
113113
},
114-
"required" : [ "method", "credential", "gcs_bucket_path", "gcs_bucket_name" ]
114+
"required" : [ "method", "credential", "gcs_bucket_name", "gcs_bucket_path" ]
115115
} ],
116116
"description" : "The way data will be uploaded to BigQuery.",
117117
"title" : "Loading Method",

airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/expected-spec-oss.json

+10-10
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@
6161
"properties" : {
6262
"credential_type" : {
6363
"type" : "string",
64-
"enum" : [ "HMAC key" ],
65-
"default" : "HMAC key"
64+
"enum" : [ "HMAC_KEY" ],
65+
"default" : "HMAC_KEY"
6666
},
6767
"hmac_key_access_id" : {
6868
"type" : "string",
@@ -96,22 +96,22 @@
9696
"title" : "GCS Tmp Files Post-Processing",
9797
"order" : 3
9898
},
99-
"gcs_bucket_path" : {
100-
"type" : "string",
101-
"description" : "Directory under the GCS bucket where data will be written.",
102-
"title" : "GCS Bucket Path",
103-
"examples" : [ "data_sync/test" ],
104-
"order" : 2
105-
},
10699
"gcs_bucket_name" : {
107100
"type" : "string",
108101
"description" : "The name of the GCS bucket. Read more <a href=\"https://cloud.google.com/storage/docs/naming-buckets\">here</a>.",
109102
"title" : "GCS Bucket Name",
110103
"examples" : [ "airbyte_sync" ],
111104
"order" : 1
105+
},
106+
"gcs_bucket_path" : {
107+
"type" : "string",
108+
"description" : "Directory under the GCS bucket where data will be written.",
109+
"title" : "GCS Bucket Path",
110+
"examples" : [ "data_sync/test" ],
111+
"order" : 2
112112
}
113113
},
114-
"required" : [ "method", "credential", "gcs_bucket_path", "gcs_bucket_name" ]
114+
"required" : [ "method", "credential", "gcs_bucket_name", "gcs_bucket_path" ]
115115
} ],
116116
"description" : "The way data will be uploaded to BigQuery.",
117117
"title" : "Loading Method",

0 commit comments

Comments
 (0)