Skip to content

Commit 20b118f

Browse files
committed
Connector operations by responsibility
1 parent e0225c1 commit 20b118f

File tree

11 files changed

+622
-289
lines changed

11 files changed

+622
-289
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.protocol.models.v0.StreamDescriptor
10+
import java.util.stream.Stream
11+
12+
/** Connector sync operations */
13+
interface SyncOperation {
14+
15+
/** DestinationFlush function sends per stream with descriptor. */
16+
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)
17+
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
10+
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
11+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
12+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
13+
import io.airbyte.protocol.models.v0.DestinationSyncMode
14+
import io.github.oshai.kotlinlogging.KotlinLogging
15+
import java.util.stream.Stream
16+
17+
abstract class AbstractStreamOperation<DestinationState : MinimumDestinationState>(
18+
private val storageOperations: StorageOperations,
19+
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
20+
) : StreamOperation<DestinationState> {
21+
private val log = KotlinLogging.logger {}
22+
23+
// State maintained to make decision between async calls
24+
private val finalTmpTableSuffix: String
25+
private val initialRawTableStatus: InitialRawTableStatus =
26+
destinationInitialStatus.initialRawTableStatus
27+
init {
28+
val stream = destinationInitialStatus.streamConfig
29+
storageOperations.prepareStage(stream.id, stream.destinationSyncMode)
30+
storageOperations.createFinalSchema(stream.id)
31+
// Prepare final tables based on sync mode.
32+
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
33+
}
34+
35+
companion object {
36+
private const val NO_SUFFIX = ""
37+
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
38+
}
39+
40+
private fun prepareFinalTable(
41+
initialStatus: DestinationInitialStatus<DestinationState>
42+
): String {
43+
val stream = initialStatus.streamConfig
44+
// No special handling if final table doesn't exist, just create and return
45+
if (!initialStatus.isFinalTablePresent) {
46+
log.info {
47+
"Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating."
48+
}
49+
storageOperations.createFinalTable(stream, NO_SUFFIX, false)
50+
return NO_SUFFIX
51+
}
52+
53+
log.info { "Final Table exists for stream ${stream.id.finalName}" }
54+
// The table already exists. Decide whether we're writing to it directly, or
55+
// using a tmp table.
56+
when (stream.destinationSyncMode) {
57+
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
58+
DestinationSyncMode.APPEND,
59+
DestinationSyncMode.APPEND_DEDUP -> {
60+
if (
61+
initialStatus.isSchemaMismatch ||
62+
initialStatus.destinationState.needsSoftReset()
63+
) {
64+
// We're loading data directly into the existing table.
65+
// Make sure it has the right schema.
66+
// Also, if a raw table migration wants us to do a soft reset, do that
67+
// here.
68+
log.info { "Executing soft-reset on final table of stream $stream" }
69+
storageOperations.softResetFinalTable(stream)
70+
}
71+
return NO_SUFFIX
72+
}
73+
}
74+
}
75+
76+
private fun prepareFinalTableForOverwrite(
77+
initialStatus: DestinationInitialStatus<DestinationState>
78+
): String {
79+
val stream = initialStatus.streamConfig
80+
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
81+
// overwrite an existing tmp table if needed.
82+
storageOperations.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
83+
log.info {
84+
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
85+
}
86+
// We want to overwrite an existing table. Write into a tmp table.
87+
// We'll overwrite the table at the
88+
// end of the sync.
89+
return TMP_OVERWRITE_TABLE_SUFFIX
90+
}
91+
92+
log.info {
93+
"Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly"
94+
}
95+
return NO_SUFFIX
96+
}
97+
98+
/** Write records will be destination type specific, Insert vs staging based on format */
99+
abstract override fun writeRecords(
100+
streamConfig: StreamConfig,
101+
stream: Stream<PartialAirbyteMessage>
102+
)
103+
104+
override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
105+
// Legacy logic that if recordsWritten or not tracked then it could be non-zero
106+
val shouldRunFinalizer =
107+
syncSummary.recordsWritten.map { it > 0 }.orElse(true) ||
108+
initialRawTableStatus.hasUnprocessedRecords
109+
if (!shouldRunFinalizer) {
110+
log.info {
111+
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
112+
"because it had no records during this sync and no unprocessed records from a previous sync."
113+
}
114+
return
115+
}
116+
117+
storageOperations.typeAndDedupe(
118+
streamConfig,
119+
initialRawTableStatus.maxProcessedTimestamp,
120+
finalTmpTableSuffix
121+
)
122+
123+
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
124+
storageOperations.cleanupStage(streamConfig.id)
125+
126+
// For overwrite, its wasteful to do T+D so we don't do soft-reset in prepare. Instead we do
127+
// type-dedupe
128+
// on a suffixed table and do a swap here when we have to for schema mismatches
129+
if (streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE) {
130+
storageOperations.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
131+
}
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
10+
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions
11+
import io.airbyte.commons.concurrency.CompletableFutures.allOf
12+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
13+
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
14+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
15+
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
16+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
17+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
18+
import io.airbyte.protocol.models.v0.StreamDescriptor
19+
import io.github.oshai.kotlinlogging.KotlinLogging
20+
import java.util.concurrent.CompletableFuture
21+
import java.util.concurrent.ExecutorService
22+
import java.util.concurrent.Executors
23+
import java.util.stream.Stream
24+
import org.apache.commons.lang3.concurrent.BasicThreadFactory
25+
26+
class DefaultConnectorOperation<DestinationState : MinimumDestinationState>(
27+
private val parsedCatalog: ParsedCatalog,
28+
private val destinationHandler: DestinationHandler<DestinationState>,
29+
private val defaultNamespace: String,
30+
private val streamOperationsFactory: StreamOperationsFactory<DestinationState>,
31+
private val migrations: List<Migration<DestinationState>>,
32+
private val executorService: ExecutorService =
33+
Executors.newFixedThreadPool(
34+
10,
35+
BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(),
36+
)
37+
) : SyncOperation {
38+
companion object {
39+
// Use companion to be accessible during instantiation with init
40+
private val log = KotlinLogging.logger {}
41+
}
42+
43+
private val streamOpsMap: Map<StreamId, StreamOperation<DestinationState>>
44+
init {
45+
streamOpsMap = createPerStreamOpClients()
46+
}
47+
48+
private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
49+
log.info { "Preparing required schemas and tables for all streams" }
50+
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)
51+
52+
// we will commit destinationStates and run Migrations here.
53+
val postMigrationInitialStates =
54+
tdutils.executeRawTableMigrations(
55+
executorService,
56+
destinationHandler,
57+
migrations,
58+
streamsInitialStates
59+
)
60+
61+
val initializationFutures =
62+
postMigrationInitialStates
63+
.map {
64+
CompletableFuture.supplyAsync(
65+
{ Pair(it.streamConfig.id, streamOperationsFactory.createInstance(it)) },
66+
executorService,
67+
)
68+
}
69+
.toList()
70+
val futuresResult = allOf(initializationFutures).toCompletableFuture().get()
71+
val result =
72+
exceptions.getResultsOrLogAndThrowFirst(
73+
"Following exceptions occurred during sync initialization",
74+
futuresResult,
75+
)
76+
return result.toMap()
77+
}
78+
79+
override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
80+
val streamConfig =
81+
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
82+
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
83+
}
84+
85+
override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
86+
// Only call finalizeTable operations which has summary. rest will be skipped
87+
val finalizeFutures =
88+
streamSyncSummaries.entries
89+
.map {
90+
CompletableFuture.supplyAsync(
91+
{
92+
val streamConfig =
93+
parsedCatalog.getStream(
94+
it.key.namespace ?: defaultNamespace,
95+
it.key.name,
96+
)
97+
streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value)
98+
},
99+
executorService,
100+
)
101+
}
102+
.toList()
103+
val futuresResult = allOf(finalizeFutures).toCompletableFuture().join()
104+
exceptions.getResultsOrLogAndThrowFirst(
105+
"Following exceptions occurred while finalizing the sync",
106+
futuresResult,
107+
)
108+
log.info { "Cleaning up sync operation thread pools" }
109+
executorService.shutdown()
110+
}
111+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
8+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
9+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
10+
import io.airbyte.protocol.models.v0.DestinationSyncMode
11+
import java.time.Instant
12+
import java.util.Optional
13+
14+
interface StorageOperations {
15+
/*
16+
* ==================== Staging Operations ================================
17+
*/
18+
19+
/**
20+
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
21+
*/
22+
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)
23+
24+
/** Delete previously staged data, using deterministic information from streamId. */
25+
fun cleanupStage(streamId: StreamId)
26+
27+
/** Copy data from provided buffer into stage. */
28+
fun writeToStage(streamId: StreamId, buffer: SerializableBuffer)
29+
30+
/*
31+
* ==================== Final Table Operations ================================
32+
*/
33+
34+
/** Create final schema extracted from [StreamId] */
35+
fun createFinalSchema(streamId: StreamId)
36+
37+
/** Create final table extracted from [StreamId] */
38+
fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean)
39+
40+
/** Reset the final table using a temp table or ALTER existing table's columns. */
41+
fun softResetFinalTable(streamConfig: StreamConfig)
42+
43+
/**
44+
* Attempt to atomically swap the final table (name and namespace extracted from [StreamId]).
45+
* This could be destination specific, INSERT INTO..SELECT * and DROP TABLE OR CREATE OR REPLACE
46+
* ... SELECT *, DROP TABLE
47+
*/
48+
fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String)
49+
50+
/**
51+
*/
52+
fun typeAndDedupe(
53+
streamConfig: StreamConfig,
54+
maxProcessedTimestamp: Optional<Instant>,
55+
finalTableSuffix: String
56+
)
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
10+
import java.util.stream.Stream
11+
12+
/** Operations on individual streams. */
13+
interface StreamOperation<T> {
14+
15+
fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>)
16+
17+
fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary)
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
8+
9+
interface StreamOperationsFactory<DestinationState> {
10+
11+
/**
12+
* Create an instance with required dependencies injected using a concrete factory
13+
* implementation.
14+
*/
15+
fun createInstance(
16+
destinationInitialStatus: DestinationInitialStatus<DestinationState>
17+
): StreamOperation<DestinationState>
18+
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import io.airbyte.cdk.integrations.base.IntegrationRunner
77
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
88
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
99
import io.airbyte.commons.concurrency.CompletableFutures
10-
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeRawTableMigrations
11-
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeWeirdMigrations
12-
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.prepareSchemas
10+
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeRawTableMigrations
11+
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeWeirdMigrations
12+
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.prepareSchemas
1313
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
1414
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
1515
import io.airbyte.protocol.models.v0.DestinationSyncMode
@@ -186,7 +186,7 @@ class DefaultTyperDeduper<DestinationState : MinimumDestinationState>(
186186
// Make sure it has the right schema.
187187
// Also, if a raw table migration wants us to do a soft reset, do that
188188
// here.
189-
TypeAndDedupeTransaction.executeSoftReset(
189+
TyperDeduperUtil.executeSoftReset(
190190
sqlGenerator,
191191
destinationHandler,
192192
stream
@@ -267,7 +267,7 @@ class DefaultTyperDeduper<DestinationState : MinimumDestinationState>(
267267

268268
val initialRawTableStatus =
269269
initialRawTableStateByStream.getValue(streamConfig.id)
270-
TypeAndDedupeTransaction.executeTypeAndDedupe(
270+
TyperDeduperUtil.executeTypeAndDedupe(
271271
sqlGenerator,
272272
destinationHandler,
273273
streamConfig,

0 commit comments

Comments
 (0)