Skip to content

Commit b7dae03

Browse files
committed
generation id / sync id in cdk
1 parent aef0fc7 commit b7dae03

File tree

23 files changed

+190
-43
lines changed

23 files changed

+190
-43
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
package io.airbyte.cdk.integrations.base
55

6-
import java.util.*
6+
import java.util.Locale
77
import org.apache.commons.lang3.StringUtils
88

99
fun upperQuoted(column: String): String {
@@ -24,41 +24,60 @@ object JavaBaseConstants {
2424
const val COLUMN_NAME_DATA: String = "_airbyte_data"
2525
@JvmField
2626
val LEGACY_RAW_TABLE_COLUMNS: List<String> =
27-
java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
27+
listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
2828

2929
// destination v2
3030
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
3131
const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at"
3232
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
3333
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
34+
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"
35+
36+
const val AIRBYTE_META_SYNC_ID_KEY = "sync_id"
3437

3538
// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
3639
// use this column list.
3740
@JvmField
3841
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
39-
java.util.List.of(
42+
listOf(
4043
COLUMN_NAME_AB_RAW_ID,
4144
COLUMN_NAME_AB_EXTRACTED_AT,
4245
COLUMN_NAME_AB_LOADED_AT,
4346
COLUMN_NAME_DATA,
4447
)
4548
@JvmField
4649
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
47-
java.util.List.of(
50+
listOf(
51+
COLUMN_NAME_AB_RAW_ID,
52+
COLUMN_NAME_AB_EXTRACTED_AT,
53+
COLUMN_NAME_AB_LOADED_AT,
54+
COLUMN_NAME_DATA,
55+
COLUMN_NAME_AB_META,
56+
)
57+
@JvmField
58+
val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List<String> =
59+
listOf(
4860
COLUMN_NAME_AB_RAW_ID,
4961
COLUMN_NAME_AB_EXTRACTED_AT,
5062
COLUMN_NAME_AB_LOADED_AT,
5163
COLUMN_NAME_DATA,
5264
COLUMN_NAME_AB_META,
65+
COLUMN_NAME_AB_GENERATION_ID,
5366
)
5467
@JvmField
5568
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
56-
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)
69+
listOf(
70+
COLUMN_NAME_AB_RAW_ID,
71+
COLUMN_NAME_AB_EXTRACTED_AT,
72+
COLUMN_NAME_AB_META,
73+
COLUMN_NAME_AB_GENERATION_ID
74+
)
5775

5876
const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
5977
enum class DestinationColumns(val rawColumns: List<String>) {
6078
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
6179
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
80+
V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION),
6281
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
6382
}
6483
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
5757
protected abstract fun writeRecord(
5858
recordString: String,
5959
airbyteMetaString: String,
60-
emittedAt: Long
60+
generationId: Long,
61+
emittedAt: Long,
6162
)
6263

6364
/**
@@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
99100
}
100101

101102
@Throws(Exception::class)
102-
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
103+
override fun accept(
104+
recordString: String,
105+
airbyteMetaString: String,
106+
generationId: Long,
107+
emittedAt: Long
108+
): Long {
103109
if (!isStarted) {
104110
if (useCompression) {
105111
compressedBuffer = GzipCompressorOutputStream(byteCounter)
@@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
111117
}
112118
if (inputStream == null && !isClosed) {
113119
val startCount = byteCounter.count
114-
writeRecord(recordString, airbyteMetaString, emittedAt)
120+
writeRecord(recordString, airbyteMetaString, generationId, emittedAt)
115121
return byteCounter.count - startCount
116122
} else {
117123
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ interface SerializableBuffer : AutoCloseable {
3737
* @throws Exception
3838
*/
3939
@Throws(Exception::class)
40-
fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long
40+
fun accept(
41+
recordString: String,
42+
airbyteMetaString: String,
43+
generationId: Long,
44+
emittedAt: Long
45+
): Long
4146

4247
/** Flush a buffer implementation. */
4348
@Throws(Exception::class) fun flush()

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,10 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
146146
includeCdcDeletedAt: Boolean,
147147
streamId: StreamId,
148148
suffix: String?,
149-
records: List<JsonNode>
149+
records: List<JsonNode>,
150+
generationId: Long,
150151
) {
152+
// TODO handle generation ID
151153
val columnNames =
152154
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
153155
insertRecords(

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ class AvroSerializedBuffer(
5858

5959
@Throws(IOException::class)
6060
@Suppress("DEPRECATION")
61-
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
61+
override fun writeRecord(
62+
recordString: String,
63+
airbyteMetaString: String,
64+
generationId: Long,
65+
emittedAt: Long
66+
) {
6267
// TODO Remove this double deserialization when S3 Destinations moves to Async.
6368
writeRecord(
6469
Jsons.deserialize(

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ abstract class BaseSheetGenerator : CsvSheetGenerator {
2828
id: UUID,
2929
formattedString: String,
3030
emittedAt: Long,
31-
formattedAirbyteMetaString: String
31+
formattedAirbyteMetaString: String,
32+
generationId: Long,
3233
): List<Any> {
3334
// TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or
3435
// RootLevelFlatteningSheetGenerator

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,19 @@ class CsvSerializedBuffer(
6868
}
6969

7070
@Throws(IOException::class)
71-
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
71+
override fun writeRecord(
72+
recordString: String,
73+
airbyteMetaString: String,
74+
generationId: Long,
75+
emittedAt: Long
76+
) {
7277
csvPrinter!!.printRecord(
7378
csvSheetGenerator.getDataRow(
7479
UUID.randomUUID(),
7580
recordString,
7681
emittedAt,
7782
airbyteMetaString,
83+
generationId,
7884
),
7985
)
8086
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ interface CsvSheetGenerator {
2626
id: UUID,
2727
formattedString: String,
2828
emittedAt: Long,
29-
formattedAirbyteMetaString: String
29+
formattedAirbyteMetaString: String,
30+
generationId: Long,
3031
): List<Any>
3132

3233
object Factory {

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class StagingDatabaseCsvSheetGenerator
2929
@JvmOverloads
3030
constructor(
3131
private val destinationColumns: JavaBaseConstants.DestinationColumns =
32-
JavaBaseConstants.DestinationColumns.LEGACY
32+
JavaBaseConstants.DestinationColumns.LEGACY,
3333
) : CsvSheetGenerator {
3434
override fun getHeaderRow(): List<String> {
3535
return destinationColumns.rawColumns
@@ -40,7 +40,9 @@ constructor(
4040
id,
4141
Jsons.serialize(recordMessage.data),
4242
recordMessage.emittedAt,
43-
Jsons.serialize(recordMessage.meta)
43+
Jsons.serialize(recordMessage.meta),
44+
// Legacy code. Default to generation 0.
45+
0,
4446
)
4547
}
4648

@@ -52,7 +54,8 @@ constructor(
5254
id: UUID,
5355
formattedString: String,
5456
emittedAt: Long,
55-
formattedAirbyteMetaString: String
57+
formattedAirbyteMetaString: String,
58+
generationId: Long,
5659
): List<Any> {
5760
return when (destinationColumns) {
5861
JavaBaseConstants.DestinationColumns.LEGACY ->
@@ -67,6 +70,15 @@ constructor(
6770
)
6871
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META ->
6972
listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString)
73+
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION ->
74+
listOf(
75+
id,
76+
Instant.ofEpochMilli(emittedAt),
77+
"",
78+
formattedString,
79+
formattedAirbyteMetaString,
80+
generationId
81+
)
7082
}
7183
}
7284
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ class JsonLSerializedBuffer(
6060
}
6161

6262
@Suppress("DEPRECATION")
63-
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
63+
override fun writeRecord(
64+
recordString: String,
65+
airbyteMetaString: String,
66+
generationId: Long,
67+
emittedAt: Long
68+
) {
6469
// TODO Remove this double deserialization when S3 Destinations moves to Async.
6570
writeRecord(
6671
Jsons.deserialize(

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,12 @@ class ParquetSerializedBuffer(
122122
}
123123

124124
@Throws(Exception::class)
125-
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
125+
override fun accept(
126+
recordString: String,
127+
airbyteMetaString: String,
128+
generationId: Long,
129+
emittedAt: Long
130+
): Long {
126131
throw UnsupportedOperationException(
127132
"This method is not supported for ParquetSerializedBuffer"
128133
)

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ internal class AsyncFlush(
6161
writer.accept(
6262
record!!.serialized!!,
6363
Jsons.serialize(record.record!!.meta),
64+
// Destinations that want to use generations should switch to the new
65+
// structure (e.g. StagingStreamOperations)
66+
0,
6467
record.record!!.emittedAt
6568
)
6669
} catch (e: Exception) {

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ private constructor(
277277
val tableName: String
278278
when (destinationColumns) {
279279
JavaBaseConstants.DestinationColumns.V2_WITH_META,
280-
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> {
280+
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META,
281+
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> {
281282
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
282283
outputSchema = streamId.rawNamespace
283284
tableName = streamId.rawName

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
4242
it.accept(
4343
record.serialized!!,
4444
Jsons.serialize(record.record!!.meta),
45+
streamConfig.generationId,
4546
record.record!!.emittedAt
4647
)
4748
}
4849
it.flush()
4950
log.info {
5051
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
5152
}
52-
storageOperation.writeToStage(streamConfig.id, writeBuffer)
53+
storageOperation.writeToStage(streamConfig, writeBuffer)
5354
}
5455
}
5556
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.base.destination.operation
66

7+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY
78
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
89
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
910
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
@@ -15,6 +16,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1516
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
1617
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
1718
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
19+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1820
import io.airbyte.protocol.models.v0.StreamDescriptor
1921
import io.github.oshai.kotlinlogging.KotlinLogging
2022
import java.util.concurrent.CompletableFuture
@@ -102,7 +104,22 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
102104
override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
103105
val streamConfig =
104106
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
105-
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
107+
streamOpsMap[streamConfig.id]?.writeRecords(
108+
streamConfig,
109+
stream.map { record ->
110+
if (record.record!!.meta == null) {
111+
record.record!!.meta = AirbyteRecordMessageMeta()
112+
}
113+
record.also {
114+
it.record!!
115+
.meta!!
116+
.setAdditionalProperty(
117+
AIRBYTE_META_SYNC_ID_KEY,
118+
streamConfig.syncId,
119+
)
120+
}
121+
},
122+
)
106123
}
107124

108125
override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
2525
disableTypeDedupe
2626
) {
2727
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
28-
storageOperation.writeToStage(streamConfig.id, stream)
28+
storageOperation.writeToStage(streamConfig, stream)
2929
}
3030
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ interface StorageOperation<Data> {
2424
fun cleanupStage(streamId: StreamId)
2525

2626
/** Write data to stage. */
27-
fun writeToStage(streamId: StreamId, data: Data)
27+
fun writeToStage(streamConfig: StreamConfig, data: Data)
2828

2929
/*
3030
* ==================== Final Table Operations ================================

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,19 @@ abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> : Destination
118118
if (
119119
!(schemaMatchesExpectation(
120120
existingV2AirbyteRawTable,
121-
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META
121+
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META,
122122
) ||
123123
schemaMatchesExpectation(
124124
existingV2AirbyteRawTable,
125-
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES
125+
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
126+
) ||
127+
schemaMatchesExpectation(
128+
existingV2AirbyteRawTable,
129+
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION,
126130
))
127131
) {
128132
throw UnexpectedSchemaException(
129-
"Destination V2 Raw Table does not match expected Schema"
133+
"Destination V2 Raw Table does not match expected Schema",
130134
)
131135
}
132136
}

0 commit comments

Comments
 (0)