Skip to content

Destinations CDK: Avoid issuing multiple create schema calls per stream. #38357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. |
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.6
version=0.35.7
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class NoOpJdbcDestinationHandler<DestinationState>(
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun createNamespaces(schemas: Set<String>) {
// Empty op, not used in old code.
}

override fun toJdbcTypeName(airbyteType: AirbyteType): String {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
val stream = destinationInitialStatus.streamConfig
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
if (!disableTypeDedupe) {
storageOperation.createFinalNamespace(stream.id)
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
private val defaultNamespace: String,
private val streamOperationFactory: StreamOperationFactory<DestinationState>,
private val migrations: List<Migration<DestinationState>>,
private val disableTypeDedupe: Boolean = false,
private val executorService: ExecutorService =
Executors.newFixedThreadPool(
10,
Expand All @@ -47,7 +48,8 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(

private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
log.info { "Preparing required schemas and tables for all streams" }
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)
val streamConfigs = parsedCatalog.streams
val streamsInitialStates = destinationHandler.gatherInitialState(streamConfigs)

val postMigrationInitialStates =
tdutils.executeRawTableMigrations(
Expand All @@ -60,11 +62,23 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
postMigrationInitialStates.associate { it.streamConfig.id to it.destinationState }
)

// Prepare raw and final schemas
val rawNamespaces = streamConfigs.map { it.id.rawNamespace }.toSet()
val finalNamespaces = streamConfigs.map { it.id.finalNamespace }.toSet()
val allNamespaces =
if (disableTypeDedupe) rawNamespaces else rawNamespaces + finalNamespaces
destinationHandler.createNamespaces(allNamespaces)

val initializationFutures =
postMigrationInitialStates
.map {
CompletableFuture.supplyAsync(
{ Pair(it.streamConfig.id, streamOperationFactory.createInstance(it)) },
{
Pair(
it.streamConfig.id,
streamOperationFactory.createInstance(it, disableTypeDedupe)
)
},
executorService,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ interface StorageOperation<Data> {
* ==================== Final Table Operations ================================
*/

/** Create final namespace extracted from [StreamId] */
fun createFinalNamespace(streamId: StreamId)

/** Create final table extracted from [StreamId] */
fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fun interface StreamOperationFactory<DestinationState> {
* implementation.
*/
fun createInstance(
destinationInitialStatus: DestinationInitialStatus<DestinationState>
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
disableTypeDedupe: Boolean,
): StreamOperation<DestinationState>
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ interface DestinationHandler<DestinationState> {

@Throws(Exception::class)
fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>)

/**
* Create all required namespaces required for the Sync. Implementations may optimize for
* checking if schema exists already.
*
* This exists here instead of StorageOperations to avoid issuing create namespace call for
* every stream and instead called from Sync operation with distinct set of namespaces required
* within the sync.
*/
fun createNamespaces(schemas: Set<String>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interface SqlGenerator {
fun createTable(stream: StreamConfig, suffix: String, force: Boolean): Sql

/**
* Used to create either the airbyte_internal or final schemas if they don't exist
* TODO delete this; superseded by [DestinationHandler.createNamespaces]
*
* @param schema the schema to create
* @return SQL to create the schema if it does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -143,7 +142,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -186,7 +184,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
// No table creation - we can just reuse the existing table.
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -227,7 +224,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -274,7 +270,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -316,7 +311,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -360,7 +354,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.softResetFinalTable(streamConfig)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -402,7 +395,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
// No soft reset - we can just reuse the existing table.
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -440,7 +432,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.softResetFinalTable(streamConfig)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -493,7 +484,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
}
confirmVerified(storageOperation)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DefaultSyncOperationTest {
private val streamOperations: MutableMap<StreamConfig, StreamOperation<MockState>> =
mutableMapOf()
private val streamOperationFactory: StreamOperationFactory<MockState> =
StreamOperationFactory { initialStatus: DestinationInitialStatus<MockState> ->
StreamOperationFactory { initialStatus: DestinationInitialStatus<MockState>, _ ->
streamOperations.computeIfAbsent(initialStatus.streamConfig) {
spyk(TestStreamOperation(initialStatus.destinationState))
}
Expand Down Expand Up @@ -118,6 +118,9 @@ class DefaultSyncOperationTest {
),
),
)
destinationHandler.createNamespaces(
setOf(appendStreamConfig.id.rawNamespace, appendStreamConfig.id.finalNamespace)
)
streamOperations.values.onEach { it.updatedDestinationState }
destinationHandler.commitDestinationStates(
mapOf(
Expand Down Expand Up @@ -201,6 +204,9 @@ class DefaultSyncOperationTest {
),
),
)
destinationHandler.createNamespaces(
setOf(appendStreamConfig.id.rawNamespace, appendStreamConfig.id.finalNamespace)
)
streamOperations.values.onEach { it.updatedDestinationState }
destinationHandler.commitDestinationStates(
mapOf(
Expand Down