Skip to content

Commit a3aaac1

Browse files
edgaofrifriSF59
authored andcommitted
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc (#57578)
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
1 parent a0429a4 commit a3aaac1

File tree

4 files changed

+89
-0
lines changed

4 files changed

+89
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db.direct_load_table
6+
7+
interface DirectLoadSqlGenerator {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db.direct_load_table
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.orchestration.db.DatabaseHandler
9+
import io.airbyte.cdk.load.orchestration.db.TableName
10+
11+
class DirectLoadTableOperations(
12+
private val sqlGenerator: DirectLoadSqlGenerator,
13+
private val databaseHandler: DatabaseHandler,
14+
) {
15+
fun createTable(
16+
stream: DestinationStream,
17+
finalTableName: TableName,
18+
suffix: String,
19+
replace: Boolean
20+
) {
21+
databaseHandler.execute(TODO())
22+
}
23+
24+
fun alterTable(
25+
stream: DestinationStream,
26+
finalTableName: TableName,
27+
) {
28+
// TODO we should figure out some reasonable abstraction for diffing existing+expected
29+
// table schema
30+
TODO()
31+
}
32+
33+
fun overwriteFinalTable(
34+
finalTableName: TableName,
35+
suffix: String,
36+
) {
37+
TODO()
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db.direct_load_table
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.state.StreamProcessingFailed
9+
import io.airbyte.cdk.load.write.StreamLoader
10+
11+
class DirectLoadTableStreamLoader(
12+
override val stream: DestinationStream,
13+
val tableOperations: DirectLoadTableOperations,
14+
) : StreamLoader {
15+
override suspend fun start() {
16+
// TODO
17+
// * create table if not exists
18+
// * alter table if needed
19+
// * truncate refresh setup
20+
}
21+
22+
override suspend fun close(hadNonzeroRecords: Boolean, streamFailure: StreamProcessingFailed?) {
23+
// TODO
24+
// * commit truncate refresh
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db.direct_load_table
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.write.DestinationWriter
9+
import io.airbyte.cdk.load.write.StreamLoader
10+
11+
class DirectLoadTableWriter(
12+
private val tableOperations: DirectLoadTableOperations,
13+
) : DestinationWriter {
14+
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
15+
return DirectLoadTableStreamLoader(stream, tableOperations)
16+
}
17+
}

0 commit comments

Comments
 (0)