Skip to content

Commit b47f80f

Browse files
authored
Destinations CDK: Add interfaces for operations by responsibility (#38107)
1 parent 20763c3 commit b47f80f

File tree

23 files changed

+1737
-299
lines changed

23 files changed

+1737
-299
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
176+
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
177178
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
178179
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
179180
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
180181
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
181-
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
182182
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
183183
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
184184
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ object JavaBaseConstants {
5757

5858
const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
5959
enum class DestinationColumns(val rawColumns: List<String>) {
60-
V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES),
61-
V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
62-
LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS)
60+
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
61+
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
62+
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
6363
}
6464
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import java.util.*
1111
* destinations framework; new implementations should always provide this information). If this
1212
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
1313
*/
14-
class StreamSyncSummary(val recordsWritten: Optional<Long>) {
14+
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {
1515

1616
companion object {
1717
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.protocol.models.v0.StreamDescriptor
10+
import java.util.stream.Stream
11+
12+
/**
13+
* Destination Connector sync operations Any initialization required for the connector should be
14+
* done as part of instantiation/init blocks
15+
*/
16+
interface SyncOperation {
17+
18+
/**
19+
* This function is a shim for
20+
* [io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction] After the
21+
* method control is returned, it should be assumed that the data is committed to a durable
22+
* storage and send back any State message acknowledgements.
23+
*/
24+
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)
25+
26+
/**
27+
* Finalize streams which could involve typing deduping or any other housekeeping tasks
28+
* required.
29+
*/
30+
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
31+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.35.5
1+
version=0.35.6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.staging
6+
7+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
8+
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
9+
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
10+
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
11+
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
12+
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator
13+
14+
/**
15+
* Factory which can create an instance of concrete SerializedBuffer for one-time use before buffer
16+
* is closed. [io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory] is almost similar
17+
* which needs to be unified. That doesn't work well with our DV2 staging destinations, which mostly
18+
* support CSV only.
19+
*/
20+
object StagingSerializedBufferFactory {
21+
22+
fun initializeBuffer(
23+
fileUploadFormat: FileUploadFormat,
24+
destinationColumns: JavaBaseConstants.DestinationColumns
25+
): SerializableBuffer {
26+
when (fileUploadFormat) {
27+
FileUploadFormat.CSV -> {
28+
return CsvSerializedBuffer(
29+
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
30+
StagingDatabaseCsvSheetGenerator(destinationColumns),
31+
true,
32+
)
33+
}
34+
else -> {
35+
TODO("Only CSV is supported for Staging format")
36+
}
37+
}
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.staging.operation
6+
7+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
10+
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
11+
import io.airbyte.cdk.integrations.destination.staging.StagingSerializedBufferFactory
12+
import io.airbyte.commons.json.Jsons
13+
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
14+
import io.airbyte.integrations.base.destination.operation.StorageOperation
15+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
16+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
17+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
18+
import io.github.oshai.kotlinlogging.KotlinLogging
19+
import java.util.stream.Stream
20+
import org.apache.commons.io.FileUtils
21+
22+
class StagingStreamOperations<DestinationState : MinimumDestinationState>(
23+
private val storageOperation: StorageOperation<SerializableBuffer>,
24+
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
25+
private val fileUploadFormat: FileUploadFormat,
26+
private val destinationColumns: JavaBaseConstants.DestinationColumns,
27+
disableTypeDedupe: Boolean = false
28+
) :
29+
AbstractStreamOperation<DestinationState, SerializableBuffer>(
30+
storageOperation,
31+
destinationInitialStatus,
32+
disableTypeDedupe
33+
) {
34+
35+
private val log = KotlinLogging.logger {}
36+
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
37+
val writeBuffer =
38+
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)
39+
40+
writeBuffer.use {
41+
stream.forEach { record: PartialAirbyteMessage ->
42+
it.accept(
43+
record.serialized!!,
44+
Jsons.serialize(record.record!!.meta),
45+
record.record!!.emittedAt
46+
)
47+
}
48+
it.flush()
49+
log.info {
50+
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
51+
}
52+
storageOperation.writeToStage(streamConfig.id, writeBuffer)
53+
}
54+
}
55+
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ dependencies {
2222
testFixturesApi testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core'))
2323
testFixturesImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
2424
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
25+
testImplementation "io.mockk:mockk:1.13.11"
2526
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
10+
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
11+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
12+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
13+
import io.airbyte.protocol.models.v0.DestinationSyncMode
14+
import io.github.oshai.kotlinlogging.KotlinLogging
15+
import java.util.Optional
16+
import java.util.stream.Stream
17+
18+
abstract class AbstractStreamOperation<DestinationState : MinimumDestinationState, Data>(
19+
private val storageOperation: StorageOperation<Data>,
20+
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
21+
private val disableTypeDedupe: Boolean = false
22+
) : StreamOperation<DestinationState> {
23+
private val log = KotlinLogging.logger {}
24+
25+
// State maintained to make decision between async calls
26+
private val finalTmpTableSuffix: String
27+
private val initialRawTableStatus: InitialRawTableStatus =
28+
destinationInitialStatus.initialRawTableStatus
29+
30+
/**
31+
* After running any sync setup code, we may update the destination state. This field holds that
32+
* updated destination state.
33+
*/
34+
final override val updatedDestinationState: DestinationState
35+
36+
init {
37+
val stream = destinationInitialStatus.streamConfig
38+
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
39+
if (!disableTypeDedupe) {
40+
storageOperation.createFinalNamespace(stream.id)
41+
// Prepare final tables based on sync mode.
42+
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
43+
} else {
44+
log.info { "Typing and deduping disabled, skipping final table initialization" }
45+
finalTmpTableSuffix = NO_SUFFIX
46+
}
47+
updatedDestinationState = destinationInitialStatus.destinationState.withSoftReset(false)
48+
}
49+
50+
companion object {
51+
private const val NO_SUFFIX = ""
52+
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
53+
}
54+
55+
private fun prepareFinalTable(
56+
initialStatus: DestinationInitialStatus<DestinationState>
57+
): String {
58+
val stream = initialStatus.streamConfig
59+
// No special handling if final table doesn't exist, just create and return
60+
if (!initialStatus.isFinalTablePresent) {
61+
log.info {
62+
"Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating."
63+
}
64+
storageOperation.createFinalTable(stream, NO_SUFFIX, false)
65+
return NO_SUFFIX
66+
}
67+
68+
log.info { "Final Table exists for stream ${stream.id.finalName}" }
69+
// The table already exists. Decide whether we're writing to it directly, or
70+
// using a tmp table.
71+
when (stream.destinationSyncMode) {
72+
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
73+
DestinationSyncMode.APPEND,
74+
DestinationSyncMode.APPEND_DEDUP -> {
75+
if (
76+
initialStatus.isSchemaMismatch ||
77+
initialStatus.destinationState.needsSoftReset()
78+
) {
79+
// We're loading data directly into the existing table.
80+
// Make sure it has the right schema.
81+
// Also, if a raw table migration wants us to do a soft reset, do that
82+
// here.
83+
log.info { "Executing soft-reset on final table of stream $stream" }
84+
storageOperation.softResetFinalTable(stream)
85+
}
86+
return NO_SUFFIX
87+
}
88+
}
89+
}
90+
91+
private fun prepareFinalTableForOverwrite(
92+
initialStatus: DestinationInitialStatus<DestinationState>
93+
): String {
94+
val stream = initialStatus.streamConfig
95+
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
96+
// overwrite an existing tmp table if needed.
97+
storageOperation.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
98+
log.info {
99+
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
100+
}
101+
// We want to overwrite an existing table. Write into a tmp table.
102+
// We'll overwrite the table at the
103+
// end of the sync.
104+
return TMP_OVERWRITE_TABLE_SUFFIX
105+
}
106+
107+
log.info {
108+
"Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly"
109+
}
110+
return NO_SUFFIX
111+
}
112+
113+
/** Write records will be destination type specific, Insert vs staging based on format */
114+
abstract override fun writeRecords(
115+
streamConfig: StreamConfig,
116+
stream: Stream<PartialAirbyteMessage>
117+
)
118+
119+
override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
120+
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
121+
storageOperation.cleanupStage(streamConfig.id)
122+
if (disableTypeDedupe) {
123+
log.info {
124+
"Typing and deduping disabled, skipping final table finalization. " +
125+
"Raw records can be found at ${streamConfig.id.rawNamespace}.${streamConfig.id.rawName}"
126+
}
127+
return
128+
}
129+
130+
// Legacy logic that if recordsWritten or not tracked then it could be non-zero
131+
val isNotOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE
132+
// Legacy logic that if recordsWritten or not tracked then it could be non-zero.
133+
// But for OVERWRITE syncs, we don't need to look at old records.
134+
val shouldRunTypingDeduping =
135+
syncSummary.recordsWritten.map { it > 0 }.orElse(true) ||
136+
(initialRawTableStatus.hasUnprocessedRecords && isNotOverwriteSync)
137+
if (!shouldRunTypingDeduping) {
138+
log.info {
139+
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
140+
"because it had no records during this sync and no unprocessed records from a previous sync."
141+
}
142+
} else {
143+
// In overwrite mode, we want to read all the raw records. Typically, this is equivalent
144+
// to filtering on timestamp, but might as well be explicit.
145+
val timestampFilter =
146+
if (isNotOverwriteSync) {
147+
initialRawTableStatus.maxProcessedTimestamp
148+
} else {
149+
Optional.empty()
150+
}
151+
storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix)
152+
}
153+
154+
// For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare. Instead, we
155+
// do
156+
// type-dedupe
157+
// on a suffixed table and do a swap here when we have to for schema mismatches
158+
if (
159+
streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE &&
160+
finalTmpTableSuffix.isNotBlank()
161+
) {
162+
storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
163+
}
164+
}
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.operation
6+
7+
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
8+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
10+
import io.airbyte.protocol.models.v0.StreamDescriptor
11+
import java.util.stream.Stream
12+
13+
class DefaultFlush(
14+
override val optimalBatchSizeBytes: Long,
15+
private val syncOperation: SyncOperation
16+
) : DestinationFlushFunction {
17+
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
18+
syncOperation.flushStream(streamDescriptor, stream)
19+
}
20+
}

0 commit comments

Comments
 (0)