Skip to content

Commit f40bd48

Browse files
gisripaedgao
authored andcommitted
Connector operations by responsibility
1 parent 31c95da commit f40bd48

File tree

14 files changed

+724
-292
lines changed

14 files changed

+724
-292
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.protocol.models.v0.StreamDescriptor
10+
import java.util.stream.Stream
11+
12+
/** Connector sync operations */
13+
interface SyncOperation {
14+
15+
/** DestinationFlush function sends per stream with descriptor. */
16+
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)
17+
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.staging.operation
6+
7+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
10+
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
11+
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator
12+
import io.airbyte.commons.json.Jsons
13+
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
14+
import io.airbyte.integrations.base.destination.operation.StorageOperations
15+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
16+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
17+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
18+
import io.github.oshai.kotlinlogging.KotlinLogging
19+
import java.util.stream.Stream
20+
import org.apache.commons.io.FileUtils
21+
22+
class StagingStreamOperations<DestinationState : MinimumDestinationState>(
23+
private val storageOperations: StorageOperations,
24+
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
25+
disableTypeDedupe: Boolean = false
26+
) :
27+
AbstractStreamOperation<DestinationState>(
28+
storageOperations,
29+
destinationInitialStatus,
30+
disableTypeDedupe
31+
) {
32+
33+
private val log = KotlinLogging.logger {}
34+
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
35+
val writeBuffer =
36+
CsvSerializedBuffer(
37+
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
38+
StagingDatabaseCsvSheetGenerator(
39+
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META
40+
),
41+
true
42+
)
43+
44+
writeBuffer.use {
45+
stream.forEach { record: PartialAirbyteMessage ->
46+
it.accept(
47+
record.serialized!!,
48+
Jsons.serialize(record.record!!.meta),
49+
record.record!!.emittedAt
50+
)
51+
}
52+
it.flush()
53+
log.info {
54+
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
55+
}
56+
storageOperations.writeToStage(streamConfig.id, writeBuffer)
57+
}
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
10+
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
11+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
12+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
13+
import io.airbyte.protocol.models.v0.DestinationSyncMode
14+
import io.github.oshai.kotlinlogging.KotlinLogging
15+
import java.util.stream.Stream
16+
17+
abstract class AbstractStreamOperation<DestinationState : MinimumDestinationState>(
18+
private val storageOperations: StorageOperations,
19+
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
20+
private val disableTypeDedupe: Boolean = false
21+
) : StreamOperation<DestinationState> {
22+
private val log = KotlinLogging.logger {}
23+
24+
// State maintained to make decision between async calls
25+
private val finalTmpTableSuffix: String
26+
private val initialRawTableStatus: InitialRawTableStatus =
27+
destinationInitialStatus.initialRawTableStatus
28+
init {
29+
val stream = destinationInitialStatus.streamConfig
30+
storageOperations.prepareStage(stream.id, stream.destinationSyncMode)
31+
if (!disableTypeDedupe) {
32+
storageOperations.createFinalSchema(stream.id)
33+
// Prepare final tables based on sync mode.
34+
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
35+
} else {
36+
log.info { "Typing and deduping disabled, skipping final table initialization" }
37+
finalTmpTableSuffix = NO_SUFFIX
38+
}
39+
}
40+
41+
companion object {
42+
private const val NO_SUFFIX = ""
43+
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
44+
}
45+
46+
private fun prepareFinalTable(
47+
initialStatus: DestinationInitialStatus<DestinationState>
48+
): String {
49+
val stream = initialStatus.streamConfig
50+
// No special handling if final table doesn't exist, just create and return
51+
if (!initialStatus.isFinalTablePresent) {
52+
log.info {
53+
"Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating."
54+
}
55+
storageOperations.createFinalTable(stream, NO_SUFFIX, false)
56+
return NO_SUFFIX
57+
}
58+
59+
log.info { "Final Table exists for stream ${stream.id.finalName}" }
60+
// The table already exists. Decide whether we're writing to it directly, or
61+
// using a tmp table.
62+
when (stream.destinationSyncMode) {
63+
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
64+
DestinationSyncMode.APPEND,
65+
DestinationSyncMode.APPEND_DEDUP -> {
66+
if (
67+
initialStatus.isSchemaMismatch ||
68+
initialStatus.destinationState.needsSoftReset()
69+
) {
70+
// We're loading data directly into the existing table.
71+
// Make sure it has the right schema.
72+
// Also, if a raw table migration wants us to do a soft reset, do that
73+
// here.
74+
log.info { "Executing soft-reset on final table of stream $stream" }
75+
storageOperations.softResetFinalTable(stream)
76+
}
77+
return NO_SUFFIX
78+
}
79+
}
80+
}
81+
82+
private fun prepareFinalTableForOverwrite(
83+
initialStatus: DestinationInitialStatus<DestinationState>
84+
): String {
85+
val stream = initialStatus.streamConfig
86+
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
87+
// overwrite an existing tmp table if needed.
88+
storageOperations.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
89+
log.info {
90+
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
91+
}
92+
// We want to overwrite an existing table. Write into a tmp table.
93+
// We'll overwrite the table at the
94+
// end of the sync.
95+
return TMP_OVERWRITE_TABLE_SUFFIX
96+
}
97+
98+
log.info {
99+
"Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly"
100+
}
101+
return NO_SUFFIX
102+
}
103+
104+
/** Write records will be destination type specific, Insert vs staging based on format */
105+
abstract override fun writeRecords(
106+
streamConfig: StreamConfig,
107+
stream: Stream<PartialAirbyteMessage>
108+
)
109+
110+
override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
111+
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
112+
storageOperations.cleanupStage(streamConfig.id)
113+
if (disableTypeDedupe) {
114+
log.info {
115+
"Typing and deduping disabled, skipping final table finalization. " +
116+
"Raw records can be found at ${streamConfig.id.rawNamespace}.${streamConfig.id.rawName}"
117+
}
118+
return
119+
}
120+
121+
// Legacy logic that if recordsWritten or not tracked then it could be non-zero
122+
val shouldRunFinalizer =
123+
syncSummary.recordsWritten.map { it > 0 }.orElse(true) ||
124+
initialRawTableStatus.hasUnprocessedRecords
125+
if (!shouldRunFinalizer) {
126+
log.info {
127+
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
128+
"because it had no records during this sync and no unprocessed records from a previous sync."
129+
}
130+
return
131+
}
132+
133+
storageOperations.typeAndDedupe(
134+
streamConfig,
135+
initialRawTableStatus.maxProcessedTimestamp,
136+
finalTmpTableSuffix
137+
)
138+
139+
// For overwrite, It's wasteful to do T+D, so we don't do soft-reset in prepare. Instead, we
140+
// do
141+
// type-dedupe
142+
// on a suffixed table and do a swap here when we have to for schema mismatches
143+
if (
144+
streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE &&
145+
finalTmpTableSuffix.isNotBlank()
146+
) {
147+
storageOperations.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
148+
}
149+
}
150+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
10+
import io.airbyte.protocol.models.v0.StreamDescriptor
11+
import java.util.stream.Stream
12+
13+
class DefaultFlush(
14+
override val optimalBatchSizeBytes: Long,
15+
private val syncOperation: SyncOperation
16+
) : DestinationFlushFunction {
17+
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
18+
syncOperation.flushStream(streamDescriptor, stream)
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
10+
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions
11+
import io.airbyte.commons.concurrency.CompletableFutures.allOf
12+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
13+
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
14+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
15+
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
16+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
17+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
18+
import io.airbyte.protocol.models.v0.StreamDescriptor
19+
import io.github.oshai.kotlinlogging.KotlinLogging
20+
import java.util.concurrent.CompletableFuture
21+
import java.util.concurrent.ExecutorService
22+
import java.util.concurrent.Executors
23+
import java.util.stream.Stream
24+
import org.apache.commons.lang3.concurrent.BasicThreadFactory
25+
26+
class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
27+
private val parsedCatalog: ParsedCatalog,
28+
private val destinationHandler: DestinationHandler<DestinationState>,
29+
private val defaultNamespace: String,
30+
private val streamOperationsFactory: StreamOperationsFactory<DestinationState>,
31+
private val migrations: List<Migration<DestinationState>>,
32+
private val executorService: ExecutorService =
33+
Executors.newFixedThreadPool(
34+
10,
35+
BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(),
36+
)
37+
) : SyncOperation {
38+
companion object {
39+
// Use companion to be accessible during instantiation with init
40+
private val log = KotlinLogging.logger {}
41+
}
42+
43+
private val streamOpsMap: Map<StreamId, StreamOperation<DestinationState>>
44+
init {
45+
streamOpsMap = createPerStreamOpClients()
46+
}
47+
48+
private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
49+
log.info { "Preparing required schemas and tables for all streams" }
50+
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)
51+
52+
// we will commit destinationStates and run Migrations here.
53+
val postMigrationInitialStates =
54+
tdutils.executeRawTableMigrations(
55+
executorService,
56+
destinationHandler,
57+
migrations,
58+
streamsInitialStates
59+
)
60+
61+
val initializationFutures =
62+
postMigrationInitialStates
63+
.map {
64+
CompletableFuture.supplyAsync(
65+
{ Pair(it.streamConfig.id, streamOperationsFactory.createInstance(it)) },
66+
executorService,
67+
)
68+
}
69+
.toList()
70+
val futuresResult = allOf(initializationFutures).toCompletableFuture().get()
71+
val result =
72+
exceptions.getResultsOrLogAndThrowFirst(
73+
"Following exceptions occurred during sync initialization",
74+
futuresResult,
75+
)
76+
return result.toMap()
77+
}
78+
79+
override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
80+
val streamConfig =
81+
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
82+
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
83+
}
84+
85+
override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
86+
try {
87+
// Only call finalizeTable operations which has summary. rest will be skipped
88+
val finalizeFutures =
89+
streamSyncSummaries.entries
90+
.map {
91+
CompletableFuture.supplyAsync(
92+
{
93+
val streamConfig =
94+
parsedCatalog.getStream(
95+
it.key.namespace ?: defaultNamespace,
96+
it.key.name,
97+
)
98+
streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value)
99+
},
100+
executorService,
101+
)
102+
}
103+
.toList()
104+
val futuresResult = allOf(finalizeFutures).toCompletableFuture().join()
105+
exceptions.getResultsOrLogAndThrowFirst(
106+
"Following exceptions occurred while finalizing the sync",
107+
futuresResult,
108+
)
109+
} finally {
110+
log.info { "Cleaning up sync operation thread pools" }
111+
executorService.shutdown()
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)