Skip to content

Commit 9e373dd

Browse files
authored
Destinations CDK: Avoid issuing multiple create schema calls per stream. (#38357)
1 parent 6f98681 commit 9e373dd

File tree

11 files changed

+42
-20
lines changed

11 files changed

+42
-20
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. |
177178
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
178179
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
179180
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.35.6
1+
version=0.35.7

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class NoOpJdbcDestinationHandler<DestinationState>(
4444
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
4545
}
4646

47+
override fun createNamespaces(schemas: Set<String>) {
48+
// Empty op, not used in old code.
49+
}
50+
4751
override fun toJdbcTypeName(airbyteType: AirbyteType): String {
4852
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
4953
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
3737
val stream = destinationInitialStatus.streamConfig
3838
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
3939
if (!disableTypeDedupe) {
40-
storageOperation.createFinalNamespace(stream.id)
4140
// Prepare final tables based on sync mode.
4241
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
4342
} else {

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt

+16-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
2929
private val defaultNamespace: String,
3030
private val streamOperationFactory: StreamOperationFactory<DestinationState>,
3131
private val migrations: List<Migration<DestinationState>>,
32+
private val disableTypeDedupe: Boolean = false,
3233
private val executorService: ExecutorService =
3334
Executors.newFixedThreadPool(
3435
10,
@@ -47,7 +48,8 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
4748

4849
private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
4950
log.info { "Preparing required schemas and tables for all streams" }
50-
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)
51+
val streamConfigs = parsedCatalog.streams
52+
val streamsInitialStates = destinationHandler.gatherInitialState(streamConfigs)
5153

5254
val postMigrationInitialStates =
5355
tdutils.executeRawTableMigrations(
@@ -60,11 +62,23 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
6062
postMigrationInitialStates.associate { it.streamConfig.id to it.destinationState }
6163
)
6264

65+
// Prepare raw and final schemas
66+
val rawNamespaces = streamConfigs.map { it.id.rawNamespace }.toSet()
67+
val finalNamespaces = streamConfigs.map { it.id.finalNamespace }.toSet()
68+
val allNamespaces =
69+
if (disableTypeDedupe) rawNamespaces else rawNamespaces + finalNamespaces
70+
destinationHandler.createNamespaces(allNamespaces)
71+
6372
val initializationFutures =
6473
postMigrationInitialStates
6574
.map {
6675
CompletableFuture.supplyAsync(
67-
{ Pair(it.streamConfig.id, streamOperationFactory.createInstance(it)) },
76+
{
77+
Pair(
78+
it.streamConfig.id,
79+
streamOperationFactory.createInstance(it, disableTypeDedupe)
80+
)
81+
},
6882
executorService,
6983
)
7084
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt

-3
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ interface StorageOperation<Data> {
3030
* ==================== Final Table Operations ================================
3131
*/
3232

33-
/** Create final namespace extracted from [StreamId] */
34-
fun createFinalNamespace(streamId: StreamId)
35-
3633
/** Create final table extracted from [StreamId] */
3734
fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean)
3835

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationFactory.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ fun interface StreamOperationFactory<DestinationState> {
1313
* implementation.
1414
*/
1515
fun createInstance(
16-
destinationInitialStatus: DestinationInitialStatus<DestinationState>
16+
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
17+
disableTypeDedupe: Boolean,
1718
): StreamOperation<DestinationState>
1819
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.kt

+10
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,14 @@ interface DestinationHandler<DestinationState> {
1919

2020
@Throws(Exception::class)
2121
fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>)
22+
23+
/**
24+
* Create all required namespaces required for the Sync. Implementations may optimize for
25+
* checking if schema exists already.
26+
*
27+
* This exists here instead of StorageOperations to avoid issuing create namespace call for
28+
* every stream and instead called from Sync operation with distinct set of namespaces required
29+
* within the sync.
30+
*/
31+
fun createNamespaces(schemas: Set<String>)
2232
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ interface SqlGenerator {
2929
fun createTable(stream: StreamConfig, suffix: String, force: Boolean): Sql
3030

3131
/**
32-
* Used to create either the airbyte_internal or final schemas if they don't exist
32+
* TODO delete this; superseded by [DestinationHandler.createNamespaces]
3333
*
3434
* @param schema the schema to create
3535
* @return SQL to create the schema if it does not exist

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt

-10
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ class AbstractStreamOperationTest {
9898

9999
verifySequence {
100100
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
101-
storageOperation.createFinalNamespace(streamId)
102101
storageOperation.createFinalTable(streamConfig, "", false)
103102
}
104103
confirmVerified(storageOperation)
@@ -143,7 +142,6 @@ class AbstractStreamOperationTest {
143142

144143
verifySequence {
145144
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
146-
storageOperation.createFinalNamespace(streamId)
147145
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
148146
}
149147
confirmVerified(storageOperation)
@@ -186,7 +184,6 @@ class AbstractStreamOperationTest {
186184

187185
verifySequence {
188186
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
189-
storageOperation.createFinalNamespace(streamId)
190187
// No table creation - we can just reuse the existing table.
191188
}
192189
confirmVerified(storageOperation)
@@ -227,7 +224,6 @@ class AbstractStreamOperationTest {
227224

228225
verifySequence {
229226
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
230-
storageOperation.createFinalNamespace(streamId)
231227
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
232228
}
233229
confirmVerified(storageOperation)
@@ -274,7 +270,6 @@ class AbstractStreamOperationTest {
274270

275271
verifySequence {
276272
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
277-
storageOperation.createFinalNamespace(streamId)
278273
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
279274
}
280275
confirmVerified(storageOperation)
@@ -316,7 +311,6 @@ class AbstractStreamOperationTest {
316311

317312
verifySequence {
318313
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
319-
storageOperation.createFinalNamespace(streamId)
320314
storageOperation.createFinalTable(streamConfig, "", false)
321315
}
322316
confirmVerified(storageOperation)
@@ -360,7 +354,6 @@ class AbstractStreamOperationTest {
360354

361355
verifySequence {
362356
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
363-
storageOperation.createFinalNamespace(streamId)
364357
storageOperation.softResetFinalTable(streamConfig)
365358
}
366359
confirmVerified(storageOperation)
@@ -402,7 +395,6 @@ class AbstractStreamOperationTest {
402395

403396
verifySequence {
404397
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
405-
storageOperation.createFinalNamespace(streamId)
406398
// No soft reset - we can just reuse the existing table.
407399
}
408400
confirmVerified(storageOperation)
@@ -440,7 +432,6 @@ class AbstractStreamOperationTest {
440432

441433
verifySequence {
442434
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
443-
storageOperation.createFinalNamespace(streamId)
444435
storageOperation.softResetFinalTable(streamConfig)
445436
}
446437
confirmVerified(storageOperation)
@@ -493,7 +484,6 @@ class AbstractStreamOperationTest {
493484

494485
verifySequence {
495486
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
496-
storageOperation.createFinalNamespace(streamId)
497487
}
498488
confirmVerified(storageOperation)
499489

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class DefaultSyncOperationTest {
5757
private val streamOperations: MutableMap<StreamConfig, StreamOperation<MockState>> =
5858
mutableMapOf()
5959
private val streamOperationFactory: StreamOperationFactory<MockState> =
60-
StreamOperationFactory { initialStatus: DestinationInitialStatus<MockState> ->
60+
StreamOperationFactory { initialStatus: DestinationInitialStatus<MockState>, _ ->
6161
streamOperations.computeIfAbsent(initialStatus.streamConfig) {
6262
spyk(TestStreamOperation(initialStatus.destinationState))
6363
}
@@ -118,6 +118,9 @@ class DefaultSyncOperationTest {
118118
),
119119
),
120120
)
121+
destinationHandler.createNamespaces(
122+
setOf(appendStreamConfig.id.rawNamespace, appendStreamConfig.id.finalNamespace)
123+
)
121124
streamOperations.values.onEach { it.updatedDestinationState }
122125
destinationHandler.commitDestinationStates(
123126
mapOf(
@@ -201,6 +204,9 @@ class DefaultSyncOperationTest {
201204
),
202205
),
203206
)
207+
destinationHandler.createNamespaces(
208+
setOf(appendStreamConfig.id.rawNamespace, appendStreamConfig.id.finalNamespace)
209+
)
204210
streamOperations.values.onEach { it.updatedDestinationState }
205211
destinationHandler.commitDestinationStates(
206212
mapOf(

0 commit comments

Comments
 (0)